asyncio programming

安装量: 72
排名: #10696

安装

npx skills add https://github.com/pluginagentmarketplace/custom-plugin-python --skill 'Asyncio Programming'

Asyncio Programming Overview Master asynchronous programming in Python with asyncio. Learn to write concurrent code that efficiently handles I/O-bound operations, build async web applications, and understand the async/await paradigm. Learning Objectives Understand asynchronous programming concepts Write async functions with async/await syntax Manage concurrent operations with asyncio Build async web applications Handle async I/O operations efficiently Debug and test async code Core Topics 1. Async/Await Basics Understanding coroutines async/await syntax Event loop fundamentals Running async functions Async vs sync execution Common pitfalls Code Example: import asyncio import time

Synchronous version (slow)

def fetch_data_sync ( url ) : print ( f"Fetching { url } ..." ) time . sleep ( 2 )

Simulating network delay

return f"Data from { url } " def main_sync ( ) : urls = [ 'url1' , 'url2' , 'url3' ] results = [ ] for url in urls : data = fetch_data_sync ( url ) results . append ( data ) return results

Takes 6 seconds (2 * 3)

start

time . time ( ) main_sync ( ) print ( f"Sync took: { time . time ( ) - start : .2f } s" )

Asynchronous version (fast)

async def fetch_data_async ( url ) : print ( f"Fetching { url } ..." ) await asyncio . sleep ( 2 )

Non-blocking sleep

return f"Data from { url } " async def main_async ( ) : urls = [ 'url1' , 'url2' , 'url3' ]

Create tasks and run concurrently

tasks

[ fetch_data_async ( url ) for url in urls ] results = await asyncio . gather ( * tasks ) return results

Takes 2 seconds (concurrent execution)

start

time . time ( ) asyncio . run ( main_async ( ) ) print ( f"Async took: { time . time ( ) - start : .2f } s" ) 2. Asyncio Tasks & Coroutines Creating and managing tasks asyncio.gather() vs asyncio.wait() Task cancellation Task groups (Python 3.11+) Exception handling in tasks Timeouts Code Example: import asyncio async def process_item ( item_id , delay ) : print ( f"Processing item { item_id } " ) await asyncio . sleep ( delay ) if item_id == 3 : raise ValueError ( f"Item { item_id } failed!" ) return f"Result { item_id } " async def main ( ) :

Method 1: gather (returns results in order)

tasks

[ process_item ( 1 , 1 ) , process_item ( 2 , 2 ) , process_item ( 3 , 1 ) , ] try : results = await asyncio . gather ( * tasks , return_exceptions = True ) for i , result in enumerate ( results ) : if isinstance ( result , Exception ) : print ( f"Task { i } failed: { result } " ) else : print ( f"Task { i } result: { result } " ) except Exception as e : print ( f"Error: { e } " )

Method 2: wait (returns done/pending sets)

tasks

[ asyncio . create_task ( process_item ( i , i ) ) for i in range ( 1 , 4 ) ] done , pending = await asyncio . wait ( tasks , timeout = 2.5 ) print ( f"Completed: { len ( done ) } , Pending: { len ( pending ) } " )

Cancel pending tasks

for task in pending : task . cancel ( )

Method 3: Task groups (Python 3.11+)

async with asyncio . TaskGroup ( ) as tg : for i in range ( 1 , 4 ) : tg . create_task ( process_item ( i , 1 ) )

All tasks completed or exception raised

asyncio . run ( main ( ) ) 3. Async I/O Operations Async file operations (aiofiles) Async HTTP requests (aiohttp) Async database operations (asyncpg, motor) Async messaging (aio-pika) Streams and protocols Code Example: import asyncio import aiohttp import aiofiles from typing import List

Async HTTP requests

async def fetch_url ( session , url ) : async with session . get ( url ) as response : return await response . text ( ) async def fetch_multiple_urls ( urls : List [ str ] ) : async with aiohttp . ClientSession ( ) as session : tasks = [ fetch_url ( session , url ) for url in urls ] results = await asyncio . gather ( * tasks ) return results

Async file operations

async def read_file_async ( filepath ) : async with aiofiles . open ( filepath , 'r' ) as f : content = await f . read ( ) return content async def write_file_async ( filepath , content ) : async with aiofiles . open ( filepath , 'w' ) as f : await f . write ( content )

Async database operations (example with asyncpg)

import asyncpg async def fetch_users ( ) : conn = await asyncpg . connect ( user = 'user' , password = 'password' , database = 'mydb' , host = 'localhost' ) try : rows = await conn . fetch ( 'SELECT * FROM users' ) return rows finally : await conn . close ( )

Usage

async def main ( ) :

Fetch URLs concurrently

urls

[ 'https://api.example.com/data1' , 'https://api.example.com/data2' , 'https://api.example.com/data3' , ] results = await fetch_multiple_urls ( urls )

Read/write files

content

await read_file_async ( 'input.txt' ) await write_file_async ( 'output.txt' , content . upper ( ) )

Database operations

users

await fetch_users ( ) print ( f"Found { len ( users ) } users" ) asyncio . run ( main ( ) ) 4. Async Web Frameworks FastAPI async routes aiohttp web server WebSocket handling Background tasks Middleware and dependencies Code Example:

FastAPI async example

from fastapi import FastAPI , BackgroundTasks import asyncio app = FastAPI ( )

Async route

@app . get ( "/users/{user_id}" ) async def get_user ( user_id : int ) :

Async database call

user

await fetch_user_from_db ( user_id ) return user

Background task

async def send_notification ( email : str , message : str ) : await asyncio . sleep ( 2 )

Simulate email sending

print ( f"Sent email to { email } : { message } " ) @app . post ( "/orders/" ) async def create_order ( order_data : dict , background_tasks : BackgroundTasks ) :

Process order synchronously

order_id

save_order ( order_data )

Send notification in background

background_tasks . add_task ( send_notification , order_data [ 'customer_email' ] , f"Order # { order_id } created" ) return { "order_id" : order_id }

aiohttp web server

from aiohttp import web async def handle_request ( request ) : name = request . match_info . get ( 'name' , 'Anonymous' ) await asyncio . sleep ( 1 )

Async operation

return web . json_response ( { 'message' : f'Hello { name } ' } ) app = web . Application ( ) app . add_routes ( [ web . get ( '/{name}' , handle_request ) ] )

WebSocket example

async def websocket_handler ( request ) : ws = web . WebSocketResponse ( ) await ws . prepare ( request ) async for msg in ws : if msg . type == web . WSMsgType . TEXT : await ws . send_str ( f"Echo: { msg . data } " ) elif msg . type == web . WSMsgType . ERROR : print ( f'Error: { ws . exception ( ) } ' ) return ws app . add_routes ( [ web . get ( '/ws' , websocket_handler ) ] ) Hands-On Practice Project 1: Async Web Scraper Build a concurrent web scraper with rate limiting. Requirements: Scrape multiple websites concurrently Implement rate limiting Handle errors gracefully Save results to async database Progress tracking Retry failed requests Key Skills: aiohttp, async I/O, error handling Project 2: Real-time Chat Server Create a WebSocket-based chat application. Requirements: WebSocket server with aiohttp Multiple chat rooms User authentication Message broadcasting Connection management Message history persistence Key Skills: WebSockets, async server, state management Project 3: Async Task Queue Build a distributed task processing system. Requirements: Task queue with Redis/RabbitMQ Worker pool management Task prioritization Result caching Progress monitoring Graceful shutdown Key Skills: Message queues, concurrent workers, cleanup Assessment Criteria Understand async/await semantics Write efficient concurrent code Handle async exceptions properly Use asyncio tasks effectively Build async web applications Debug async code Manage async resources (cleanup) Resources Official Documentation asyncio Docs - Official documentation aiohttp - Async HTTP client/server FastAPI - Modern async web framework Learning Platforms Using Asyncio in Python - Real Python guide Python Concurrency - O'Reilly book Asyncio Recipes - Practical examples Tools aiohttp - Async HTTP aiofiles - Async file I/O asyncpg - Async PostgreSQL aiodebug - Async debugging Next Steps After mastering asyncio, explore: Multiprocessing - CPU-bound parallelism Celery - Distributed task queue gRPC - Async RPC framework Kafka - Async event streaming

返回排行榜